/*chunk table data structure */
#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <dirent.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBCONTROL

#include "limits.h"
#include "adt.h"
#include "sysy_lib.h"
#include "bmap.h"
#include "net_table.h"
#include "configure.h"
#include "mem_cache.h"
#include "net_global.h"
#include "table_proto.h"
#include "../chunk/chunk_proto.h"
#include "lich_md.h"
#include "ylog.h"
#include "dbg.h"

STATIC int __table_proto_xattr_load(table_proto_t *table_proto);

typedef struct {
        char *key;
        char *value;
        uint32_t loc;
        char buf[0];
} xattr_entry_t;

typedef xattr_entry_t entry_t;

static int __cmp(const void *v1, const void *v2)
{
        const entry_t *ent = v1;
        const char *key = v2;

        DBUG("cmp %s %s\n", ent->key, key);

        return strcmp(ent->key, key);
}

static uint32_t __key(const void *i)
{
        return hash_str((char *)i);
}

STATIC int __table_proto_xattr_insert(hashtable_t xattr, const char *key,
                                      const char *value, int loc, int flag)
{
        int ret, replace;
        entry_t *ent;

        ret = ymalloc((void **)&ent, sizeof(*ent) + strlen(key) + strlen(value) + 2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent->key = ent->buf;
        ent->value = &ent->buf[strlen(key) + 1];
        ent->loc = loc;
        strcpy(ent->key, key);
        strcpy(ent->value, value);

        if (flag & O_EXCL)
                replace = 0;
        else
                replace = 1;

        ret = hash_table_insert(xattr, (void *)ent, ent->key, replace);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_read_xmap(table_proto_t *table_proto, char *map)
{
        int offset, size;

        size = TABLE_PROTO_XMAP_AREA;
        offset = TABLE_PROTO_XMAP;

        return __table_proto_read(table_proto->chkinfo, table_proto->chkstat,
                                  map, size, offset, NULL);
}

STATIC int __table_proto_loadxmap(table_proto_t *table_proto)
{
        int ret;
        char *buf;

        memset(&table_proto->xattr->bmap, 0x0, sizeof(table_proto->xattr->bmap));

        ret = ymalloc((void **)&buf, TABLE_PROTO_XMAP_AREA);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table_proto_read_xmap(table_proto, buf);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        bmap_load(&table_proto->xattr->bmap, buf, (TABLE_PRORO_XATTR_ITEM_COUNT / CHAR_BIT));

        DINFO("load  xmap of table "CHKID_FORMAT" count %llu\n",
              CHKID_ARG(&table_proto->chkid), (LLU)table_proto->xattr->bmap.nr_one);

        return 0;
err_free:
        yfree((void **)&buf);
err_ret:
        return ret;
}

STATIC int __table_proto_write_xmap(table_proto_t *table_proto, const char *map, int _size)
{
        int ret, size, offset;
        char *buf;

#ifdef HAVE_STATIC_ASSERT
        static_assert(TABLE_PROTO_XMAP_AREA  <= PAGE_SIZE, "map");
#endif

        YASSERT(_size <= TABLE_PROTO_XMAP_AREA);
        buf = mem_cache_calloc(MEM_CACHE_4K, 1);

        size = TABLE_PROTO_XMAP_AREA;
        offset = TABLE_PROTO_XMAP;

        if (_size < size) {
                memcpy(buf, map, _size);
                ret = __table_proto_write(table_proto->chkinfo, table_proto->chkstat,
                                           buf, size, offset);
        } else {
                ret = __table_proto_write(table_proto->chkinfo, table_proto->chkstat,
                                          map, size, offset);
        }
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

STATIC int __table_proto_xattr_read(table_proto_t *table_proto, char *buf, int from, int to, int *count)
{
        int ret, size, newsize;

        YASSERT(from * TABLE_PRORO_XATTR_ITEM_SIZE + TABLE_PROTO_ITEM
                < (int)LICH_CHUNK_SPLIT);

        size = (to - from) * TABLE_PRORO_XATTR_ITEM_SIZE;
        ret =  __table_proto_read(table_proto->chkinfo, table_proto->chkstat, buf, size,
                                  from * TABLE_PRORO_XATTR_ITEM_SIZE + TABLE_PROTO_XATTR, &newsize);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        YASSERT(from * TABLE_PRORO_XATTR_ITEM_SIZE + TABLE_PROTO_ITEM
                + ret <= (int)LICH_CHUNK_SPLIT);

        *count = newsize / TABLE_PRORO_XATTR_ITEM_SIZE;

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_xattr_loaditem(table_proto_t *table_proto, const char *buf,
                                        int from, int _to)
{
        int ret, loc, to;
        const char *value, *key;

        YASSERT(from < (int)table_proto->xattr->bmap.size);
        to = _to > (int)table_proto->xattr->bmap.size ? (int)table_proto->xattr->bmap.size : _to;

        for (loc = from; loc < to; loc++) {
                if (bmap_get(&table_proto->xattr->bmap, loc) == 0)
                        continue;

                key = (void *)buf + (loc - from) * TABLE_PRORO_XATTR_ITEM_SIZE;
                value = key + TABLE_PRORO_XATTR_ITEM_SIZE / 2;

                //DBUG("load value '%s' at loc %u offset %u\n", value, loc, offset +  i * table_proto->item_size);
                //DBUG("real loc %u\n", offset + TABLE_PROTO_XMAP_AREA + i * table_proto->item_size);

#if 1
                if ((strlen(key) == 0) || (strlen(value) == 0)) {
                        DERROR("load %s %s into "CHKID_FORMAT" loc %u fail\n",
                                key, value, CHKID_ARG(&table_proto->chkinfo->id), loc);
                        ret = EIO;
                        GOTO(err_ret, ret);
                }
#else
                if ((strlen(key) == 0) || (strlen(value) == 0)) {
                        DERROR("load (%s) (%s) into "CHKID_FORMAT" loc %u fail\n",
                                key, value, CHKID_ARG(&table_proto->chkinfo->id), loc);
                        continue;
                }
#endif

                DINFO("insert %s %s into "CHKID_FORMAT" loc %u\n",
                      key, value, CHKID_ARG(&table_proto->chkinfo->id), loc);

                ret = __table_proto_xattr_insert(table_proto->xattr->tab, key, value, loc, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_xattr_load__(table_proto_t *table_proto)
{
        int ret, loc, buf_count, count;
        char buf[MAX_BUF_LEN];

        DBUG("load table "CHKID_FORMAT"\n", CHKID_ARG(&table_proto->chkid));

        loc = 0;
        buf_count = MAX_BUF_LEN / TABLE_PRORO_XATTR_ITEM_SIZE;
        while (loc < (int)table_proto->xattr->bmap.size) {
                if (bmap_range_empty(&table_proto->xattr->bmap, loc, loc + buf_count)) {
                        loc += buf_count;
                        continue;
                }

                ret = __table_proto_xattr_read(table_proto, buf, loc, loc + buf_count, &count);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                //DBUG("load table %s offset %u size %u\n",
                //CHKID_ARG(&table_proto->chkid), offset + TABLE_PROTO_XMAP_AREA, ret);

                if (count == 0)
                        break;

                ret = __table_proto_xattr_loaditem(table_proto, buf, loc, loc + count);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                loc += count;
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_xattr_commit(table_proto_t *table_proto, const void *item,
                                int loc, int newitem)
{
        int ret;

        ret =  __table_proto_write(table_proto->chkinfo, table_proto->chkstat, item,
                                   TABLE_PRORO_XATTR_ITEM_SIZE,
                                   loc * TABLE_PRORO_XATTR_ITEM_SIZE + TABLE_PROTO_XATTR);
        //ret = __table_proto_write_item(table_proto, value, TABLE_PRORO_XATTR_ITEM_SIZE, loc);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (newitem) {
                ret = bmap_set(&table_proto->xattr->bmap, loc);
                if (unlikely(ret))
                        YASSERT(0);

                DBUG("commit at loc %u offset %u\n", loc,
                     loc * TABLE_PRORO_XATTR_ITEM_SIZE);

                ret = __table_proto_write_xmap(table_proto, table_proto->xattr->bmap.bits,
                                              (TABLE_PRORO_XATTR_ITEM_COUNT / CHAR_BIT));
                if (unlikely(ret)) {
                        bmap_del(&table_proto->xattr->bmap, loc);
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}


STATIC int __table_proto_xattr_set(table_proto_t *table_proto, const char *key, const char *value, int flag)
{
        int ret, loc;
        entry_t *ent;
        char item[TABLE_PRORO_XATTR_ITEM_SIZE];

        YASSERT(strcmp(key, STORAGE_AREA_KEY));
        
        strcpy(item, key);
        strcpy(item + TABLE_PRORO_XATTR_ITEM_SIZE / 2, value);

        if ((strlen(key) == 0) || (strlen(value) == 0)) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        if (strlen(key) + 1 > TABLE_PRORO_XATTR_ITEM_SIZE / 2) {
                ret = ENAMETOOLONG;
                GOTO(err_ret, ret);
        }

        if (strlen(value) + 1 > TABLE_PRORO_XATTR_ITEM_SIZE / 2) {
                ret = ENAMETOOLONG;
                GOTO(err_ret, ret);
        }
        
        ent = hash_table_find(table_proto->xattr->tab, (void *)key);
        if (ent) {
                if (!(flag & O_EXCL)) {
                        DINFO("update %s @ "CHKID_FORMAT" loc %u\n", key, CHKID_ARG(&table_proto->chkinfo->id), ent->loc);
                        
                        ret = __table_proto_xattr_commit(table_proto, item, ent->loc, 0);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret = __table_proto_xattr_insert(table_proto->xattr->tab, key, value, ent->loc, flag);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        goto out;
                } else {
                        ret = EEXIST;
                        GOTO(err_ret, ret);
                }
        } else {
                loc = bmap_get_empty(&table_proto->xattr->bmap);
                if (loc == -1) {
                        ret = ENOSPC;
                        GOTO(err_ret, ret);
                }

                DINFO("insert %s @ "CHKID_FORMAT" loc %u\n", key, CHKID_ARG(&table_proto->chkinfo->id), loc);
                
                ret = __table_proto_xattr_commit(table_proto, item, loc, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __table_proto_xattr_insert(table_proto->xattr->tab, key, value, loc, flag);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __table_proto_xattr_get(table_proto_t *table_proto, const char *key, char *value)
{
        int ret;
        entry_t *ent;

        ent = hash_table_find(table_proto->xattr->tab, (void *)key);
        if (ent == NULL) {
                ret = ENOKEY;
                GOTO(err_ret, ret);
        }

        strcpy(value, ent->value);

        return 0;
err_ret:
        return ret;
}

STATIC int __xattr_dump(void *buf, void *_ent)
{
        entry_t *ent = _ent;

        sprintf(buf + strlen(buf), "%s: %s\n", ent->key, ent->value);

        return 0;
}

STATIC int __table_proto_xattr_list(table_proto_t *table_proto, char *buf)
{
        buf[0] = '\0';
        return hash_iterate_table_entries(table_proto->xattr->tab, __xattr_dump, buf);
}

STATIC int __table_proto_xattr_remove(table_proto_t *table_proto, const char *key)
{
        int ret;
        entry_t *ent;

        ent = hash_table_find(table_proto->xattr->tab, (void *)key);
        if (ent == NULL) {
                ret = ENOKEY;
                GOTO(err_ret, ret);
        }

        DINFO("remove %s @ "CHKID_FORMAT" left %u\n", key,
              CHKID_ARG(&table_proto->chkid), table_proto->xattr->bmap.nr_one);

        ret = bmap_del(&table_proto->xattr->bmap, ent->loc);
        if (unlikely(ret))
                YASSERT(0);

        ret = __table_proto_write_xmap(table_proto, table_proto->xattr->bmap.bits,
                                       (TABLE_PRORO_XATTR_ITEM_COUNT / CHAR_BIT));
        if (unlikely(ret)) {
                bmap_set(&table_proto->xattr->bmap, ent->loc);
                GOTO(err_ret, ret);
        }

        ret = hash_table_remove(table_proto->xattr->tab, (void *)key, (void **)&ent);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        yfree((void **)&ent);

        return 0;
err_ret:
        return ret;        
}

STATIC int __table_proto_xattr_load(table_proto_t *table_proto)
{
        int ret;

        ret = __table_proto_loadxmap(table_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __table_proto_xattr_load__(table_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int table_xattr_init(table_proto_t *table_proto)
{
        int ret;

        YASSERT(table_proto->chkid.type == __VOLUME_CHUNK__
                || table_proto->chkid.type == __POOL_CHUNK__);
        
        ret = ymalloc((void **)&table_proto->xattr, sizeof(*table_proto->xattr));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        table_proto->xattr->set = __table_proto_xattr_set;
        table_proto->xattr->get = __table_proto_xattr_get;
        table_proto->xattr->remove = __table_proto_xattr_remove;
        table_proto->xattr->list = __table_proto_xattr_list;

        table_proto->xattr->tab = hash_create_table(__cmp, __key, "xattr");
        if (table_proto->xattr->tab == NULL) {
                ret = ENOMEM;
                GOTO(err_free, ret);
        }

        ret = __table_proto_xattr_load(table_proto);
        if (unlikely(ret))
                GOTO(err_destroy, ret);

        return 0;
err_destroy:
        hash_destroy_table(table_proto->xattr->tab, NULL);
err_free:
        yfree((void **)&table_proto->xattr);
        table_proto->xattr = NULL;
err_ret:
        return ret;        
}

void table_xattr_destroy(table_proto_t *table_proto)
{
        if (table_proto->xattr) {
                hash_destroy_table(table_proto->xattr->tab, NULL);
                bmap_destroy(&table_proto->xattr->bmap);
                yfree((void **)&table_proto->xattr);
                table_proto->xattr = NULL;
        }
}
